package com.nx.platform.es.biz.esspider.reactor;


import com.google.common.base.Preconditions;
import com.nx.platform.es.biz.esspider.deliver.Deliverer;
import com.nx.platform.es.biz.esspider.deliver.DelivererImpl;
import com.nx.platform.es.biz.esspider.process.ProcessorServiceImpl;
import com.nx.platform.es.biz.esspider.recycle.Recycler;
import com.nx.platform.es.biz.esspider.resource.DBManager;
import com.nx.platform.es.biz.esspider.resource.DBManagerImpl;
import com.nx.platform.es.biz.esspider.sink.Sinks;
import com.nx.platform.es.biz.esspider.source.InvokeSourceService;
import com.nx.platform.es.biz.esspider.source.MQSourceService;
import com.nx.platform.es.biz.esspider.source.RecycleSourceService;
import com.nx.platform.es.common.utils.YamlParser;
import com.nx.platform.es.system.config.ConfigCenter;
import com.nx.platform.es.biz.esspider.handler.Handlers;
import com.nx.platform.es.biz.esspider.handler.HandlersImpl;
import com.nx.platform.es.biz.esspider.recycle.RecyclerImpl;
import com.nx.platform.es.biz.esspider.sink.SinksImpl;
import com.nx.platform.es.common.utils.Constants;
import com.nx.platform.es.common.utils.MoreMaps;
import com.nx.platform.es.service.ESClientManager;
import com.nx.platform.es.service.impl.ESClientManagerImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.util.List;
import java.util.Map;

/**
 * reactor启动类
 * @author
 * @date 2018/04/13
 */
@Slf4j
public class NotifyReactor extends Reactor {

    private final String configFilePath;
    private final String dataFilePath;
    private final String dbConfigKey;
    private final String esConfigKey;
    private final String handlerConfigKey;
    private final String sinkConfigKey;
    private final String processorConfigKey;
    private final String mqSourceConfigKey;
    private final String debugSwitchConfigKey;

    private NotifyReactor(
            String configFilePath, String dataFilePath, String configKeyPrefix,
            String dbConfigKey, String esConfigKey,
            String handlerConfigKey, String sinkConfigKey, String processorConfigKey,
            String mqSourceConfigKey, String debugSwitchConfigKey) {
        this.configFilePath = configFilePath;
        this.dataFilePath = dataFilePath;
        this.dbConfigKey = configKeyPrefix + "." + dbConfigKey;
        this.esConfigKey = configKeyPrefix + "." + esConfigKey;
        this.processorConfigKey = configKeyPrefix + "." + processorConfigKey;
        this.handlerConfigKey = configKeyPrefix + "." + handlerConfigKey;
        this.sinkConfigKey = configKeyPrefix + "." + sinkConfigKey;
        this.mqSourceConfigKey = configKeyPrefix + "." + mqSourceConfigKey;
        this.debugSwitchConfigKey = configKeyPrefix + "." + debugSwitchConfigKey;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    protected void init() {
        log.info("notifyReactor init begin==================");
        // resources
        DBManager dbManager = new DBManagerImpl(configFilePath, dbConfigKey);
        ESClientManager esManager = new ESClientManagerImpl();

        // handlers, sinks, deliverer, recycler
        Handlers handlers = new HandlersImpl(handlerConfigKey, dbManager);
        Sinks sinks = new SinksImpl(sinkConfigKey, esManager);
        Deliverer deliverer = new DelivererImpl();
        Recycler recycler = new RecyclerImpl(dataFilePath);

        // sources
        addSource(new MQSourceService(deliverer, true, mqSourceConfigKey));
        addSource(new InvokeSourceService(deliverer));
        addSource(new RecycleSourceService(deliverer, recycler));

        // processors 用于实际处理的线程池。每个索引主题一个processor
        List<Map<?, ?>> processors = YamlParser.parseToList(ConfigCenter.getConfig(processorConfigKey).orElse(""));
        processors.stream().map(map -> Pair.of(MoreMaps.getString(map, "code"), map))
                .map(p -> new ProcessorServiceImpl(p.getLeft(), p.getRight(), handlers, sinks, recycler, debugSwitchConfigKey))
                .peek(deliverer::register).forEach(this::addProcessor);
    }

    public static class Builder {

        private String configFilePath;
        private String dataFilePath;
        private String configNamespace;

        private Builder() {}

        public NotifyReactor build() {
            Preconditions.checkArgument(StringUtils.isNotBlank(configFilePath));
            Preconditions.checkArgument(StringUtils.isNotBlank(dataFilePath));
            Preconditions.checkArgument(StringUtils.isNotBlank(configNamespace));
            return new NotifyReactor(
                    configFilePath, dataFilePath, configNamespace,
                    Constants.KEY_DB, Constants.ES_CLIENT,
                    Constants.KEY_HANDLERS, Constants.KEY_SINKS, Constants.KEY_PROCESSORS,
                    Constants.KEY_MQ, Constants.KEY_DEBUG);
        }

        public Builder configFilePath(String configFilePath) {
            this.configFilePath = configFilePath;
            return this;
        }

        public Builder dataFilePath(String dataFilePath) {
            this.dataFilePath = dataFilePath;
            return this;
        }

        public Builder configNamespace(String configNamespace) {
            this.configNamespace = configNamespace;
            return this;
        }

    }

}
